package com.huan.flink;

import com.huan.flink.map.Product;
import com.huan.flink.map.ProductMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.*;
import org.apache.flink.streaming.api.windowing.time.Time;

/**
 * flink 中窗口的使用
 *
 * @author huan.fu
 * @date 2024/1/6 - 11:36
 */
public class WindowApiApplication {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置并行度为2
        environment.setParallelism(2);

        SingleOutputStreamOperator<Product> streamDS = environment.socketTextStream("localhost", 9999)
                .map(new ProductMapFunction());

        KeyedStream<Product, Integer> streamKS = streamDS.keyBy((KeySelector<Product, Integer>) Product::getProductId);

        // 1、指定 窗口分配器 指定使用哪一种窗口 时间、计数、滑动、滚动或者会话
        // 1.1 没有 keyBy 的窗口，窗口内所有数据都会进入同一个子任务，并行度只能为1
        // streamDS.windowAll();
        // 1.2 有 keyBy 的窗口，每个key上都定义了一组窗口，各自独立的进行计算

        // 基于时间的窗口

        // 基于 "处理时间" 语义的 滚动 窗口，窗口大小为10s
        streamKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));
        // 基于 "处理时间" 语义的 滑动 窗口，窗口大小为10s，滑动步长为2s
        streamKS.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(2)));
        // 基于 "处理时间" 语义的 会话 窗口，5s钟没有数据进来，认为前面的一个窗口结束了，之后的数据就是在一个新的窗口中。
        streamKS.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)));
        // streamKS.window(ProcessingTimeSessionWindows.withDynamicGap());

        // 基于计数的

        // 滑动窗口，窗口大小为5， 即每5个元素划分到一个窗口中。
        streamKS.countWindow(5);
        // 滑动窗口，窗口大小为5， 滑动步长为2
        streamKS.countWindow(5, 2);
        // 全局窗口，计数的底层就是使用这个使用，需要自定义触发器
        streamKS.window(GlobalWindows.create());

        // 2、指定 窗口分配器 窗口内的数据的计算逻辑

        environment.execute("window api");
    }

}
